// Copyright 2015 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "remoting/protocol/webrtc_transport.h"

#include <algorithm>
#include <memory>
#include <string>
#include <utility>
#include <vector>

#include "base/base64.h"
#include "base/functional/bind.h"
#include "base/logging.h"
#include "base/memory/ptr_util.h"
#include "base/memory/raw_ref.h"
#include "base/strings/string_number_conversions.h"
#include "base/task/single_thread_task_runner.h"
#include "base/threading/thread_restrictions.h"
#include "components/webrtc/net_address_utils.h"
#include "components/webrtc/thread_wrapper.h"
#include "remoting/base/constants.h"
#include "remoting/protocol/authenticator.h"
#include "remoting/protocol/port_allocator_factory.h"
#include "remoting/protocol/sdp_message.h"
#include "remoting/protocol/transport.h"
#include "remoting/protocol/transport_context.h"
#include "remoting/protocol/webrtc_audio_module.h"
#include "third_party/abseil-cpp/absl/strings/string_view.h"
#include "third_party/libjingle_xmpp/xmllite/xmlelement.h"
#include "third_party/webrtc/api/audio_codecs/audio_decoder_factory_template.h"
#include "third_party/webrtc/api/audio_codecs/audio_encoder_factory_template.h"
#include "third_party/webrtc/api/audio_codecs/opus/audio_decoder_opus.h"
#include "third_party/webrtc/api/audio_codecs/opus/audio_encoder_opus.h"
#include "third_party/webrtc/api/call/call_factory_interface.h"
#include "third_party/webrtc/api/peer_connection_interface.h"
#include "third_party/webrtc/api/rtc_event_log/rtc_event_log_factory.h"
#include "third_party/webrtc/api/video_codecs/builtin_video_decoder_factory.h"
#include "third_party/webrtc/media/engine/webrtc_media_engine.h"
#include "third_party/webrtc/modules/audio_processing/include/audio_processing.h"
#include "third_party/webrtc_overrides/task_queue_factory.h"

#if !defined(NDEBUG)
#include "base/command_line.h"
#endif

using jingle_xmpp::QName;
using jingle_xmpp::XmlElement;

namespace remoting::protocol {

class ScopedAllowThreadJoinForWebRtcTransport
    : public base::ScopedAllowBaseSyncPrimitivesOutsideBlockingScope {};

class ScopedAllowSyncPrimitivesForWebRtcTransport
    : public base::ScopedAllowBaseSyncPrimitivesOutsideBlockingScope {};

namespace {

using DataChannelState = webrtc::DataChannelInterface::DataState;

// Delay after candidate creation before sending transport-info message to
// accumulate multiple candidates. This is an optimization to reduce number of
// transport-info messages.
const int kTransportInfoSendDelayMs = 20;

// XML namespace for the transport elements.
const char kTransportNamespace[] = "google:remoting:webrtc";

// Global maximum bitrate set for the PeerConnection.
const int kMaxBitrateBps = 1e8;  // 100 Mbps.

// Frequency of polling the event and control data channels for their current
// state while waiting for them to close.
constexpr base::TimeDelta kDefaultDataChannelStatePollingInterval =
    base::Milliseconds(50);

// The maximum amount of time we will wait for the data channels to close before
// closing the PeerConnection.
constexpr base::TimeDelta kWaitForDataChannelsClosedTimeout = base::Seconds(5);

base::TimeDelta data_channel_state_polling_interval =
    kDefaultDataChannelStatePollingInterval;

#if !defined(NDEBUG)
// Command line switch used to disable signature verification.
// TODO(sergeyu): Remove this flag.
const char kDisableAuthenticationSwitchName[] = "disable-authentication";
#endif

bool IsValidSessionDescriptionType(const std::string& type) {
  return type == webrtc::SessionDescriptionInterface::kOffer ||
         type == webrtc::SessionDescriptionInterface::kAnswer;
}

void UpdateCodecParameters(SdpMessage* sdp_message, bool incoming) {
  // Update SDP format to use 160kbps stereo for opus codec.
  if (sdp_message->has_audio() &&
      !sdp_message->AddCodecParameter("opus",
                                      "stereo=1; maxaveragebitrate=163840")) {
    if (incoming) {
      LOG(WARNING) << "Opus not found in an incoming SDP.";
    } else {
      LOG(FATAL) << "Opus not found in SDP generated by WebRTC.";
    }
  }
}

std::string GetTransportProtocol(const cricket::CandidatePair& candidate_pair) {
  const cricket::Candidate& local_candidate = candidate_pair.local_candidate();
  return (local_candidate.type() == "relay") ? local_candidate.relay_protocol()
                                             : local_candidate.protocol();
}

// Returns true if the selected candidate-pair indicates a relay connection.
absl::optional<bool> IsConnectionRelayed(
    const cricket::CandidatePair& selected_candidate_pair) {
  const cricket::Candidate& local_candidate =
      selected_candidate_pair.local_candidate();
  const cricket::Candidate& remote_candidate =
      selected_candidate_pair.remote_candidate();
  return local_candidate.type() == "relay" ||
         remote_candidate.type() == "relay";
}

// Utility function to map a cricket::Candidate string type to a
// TransportRoute::RouteType enum value.
TransportRoute::RouteType CandidateTypeToTransportRouteType(
    const std::string& candidate_type) {
  if (candidate_type == "local") {
    return TransportRoute::DIRECT;
  } else if (candidate_type == "stun" || candidate_type == "prflx") {
    return TransportRoute::STUN;
  } else if (candidate_type == "relay") {
    return TransportRoute::RELAY;
  } else {
    LOG(ERROR) << "Unknown candidate type: " << candidate_type;
    return TransportRoute::DIRECT;
  }
}

void SetSenderParameters(webrtc::RtpSenderInterface& sender,
                         const webrtc::RtpParameters& parameters) {
  ScopedAllowSyncPrimitivesForWebRtcTransport allow_wait;
  webrtc::RTCError result = sender.SetParameters(parameters);
  DCHECK(result.ok()) << "SetParameters() failed: " << result.message();
}

// Initializes default parameters for a sender that may be different from
// WebRTC's defaults.
void SetDefaultSenderParameters(
    rtc::scoped_refptr<webrtc::RtpSenderInterface> sender) {
  if (sender->media_type() == cricket::MEDIA_TYPE_VIDEO) {
    webrtc::RtpParameters parameters = sender->GetParameters();
    if (parameters.encodings.empty()) {
      LOG(ERROR) << "No encodings found for sender " << sender->id();
      return;
    }

    for (auto& encoding : parameters.encodings) {
      encoding.max_framerate = kTargetFrameRate;
    }

    SetSenderParameters(*sender, parameters);
  }
}

// A webrtc::CreateSessionDescriptionObserver implementation used to receive the
// results of creating descriptions for this end of the PeerConnection.
class CreateSessionDescriptionObserver
    : public webrtc::CreateSessionDescriptionObserver {
 public:
  typedef base::OnceCallback<void(
      std::unique_ptr<webrtc::SessionDescriptionInterface> description,
      const std::string& error)>
      ResultCallback;

  static CreateSessionDescriptionObserver* Create(
      ResultCallback result_callback) {
    return new rtc::RefCountedObject<CreateSessionDescriptionObserver>(
        std::move(result_callback));
  }

  CreateSessionDescriptionObserver(const CreateSessionDescriptionObserver&) =
      delete;
  CreateSessionDescriptionObserver& operator=(
      const CreateSessionDescriptionObserver&) = delete;

  void OnSuccess(webrtc::SessionDescriptionInterface* desc) override {
    std::move(result_callback_).Run(base::WrapUnique(desc), std::string());
  }

  void OnFailure(webrtc::RTCError error) override {
    std::move(result_callback_).Run(nullptr, error.message());
  }

 protected:
  explicit CreateSessionDescriptionObserver(ResultCallback result_callback)
      : result_callback_(std::move(result_callback)) {}
  ~CreateSessionDescriptionObserver() override = default;

 private:
  ResultCallback result_callback_;
};

// A webrtc::SetSessionDescriptionObserver implementation used to receive the
// results of setting local and remote descriptions of the PeerConnection.
class SetSessionDescriptionObserver
    : public webrtc::SetSessionDescriptionObserver {
 public:
  typedef base::OnceCallback<void(bool success, const std::string& error)>
      ResultCallback;

  static SetSessionDescriptionObserver* Create(ResultCallback result_callback) {
    return new rtc::RefCountedObject<SetSessionDescriptionObserver>(
        std::move(result_callback));
  }

  SetSessionDescriptionObserver(const SetSessionDescriptionObserver&) = delete;
  SetSessionDescriptionObserver& operator=(
      const SetSessionDescriptionObserver&) = delete;

  void OnSuccess() override {
    std::move(result_callback_).Run(true, std::string());
  }

  void OnFailure(webrtc::RTCError error) override {
    std::move(result_callback_).Run(false, error.message());
  }

 protected:
  explicit SetSessionDescriptionObserver(ResultCallback result_callback)
      : result_callback_(std::move(result_callback)) {}
  ~SetSessionDescriptionObserver() override = default;

 private:
  ResultCallback result_callback_;
};

class RtcEventLogOutput : public webrtc::RtcEventLogOutput {
 public:
  // |event_log_data| will be populated with the RTC event data during logging.
  // The caller owns |event_log_data| and must keep it alive as long as
  // WebRTC provides event logging to this instance (that is, until
  // PeerConnection::StopEventLog() is called, or the PeerConnection is
  // destroyed).
  explicit RtcEventLogOutput(WebrtcEventLogData& event_log_data)
      : event_log_data_(event_log_data) {}
  ~RtcEventLogOutput() override = default;

  RtcEventLogOutput(const RtcEventLogOutput&) = delete;
  RtcEventLogOutput& operator=(const RtcEventLogOutput&) = delete;

  // webrtc::RtcEventLogOutput interface
  bool IsActive() const override { return true; }
  bool Write(absl::string_view output) override {
    event_log_data_->Write(output);
    return true;
  }

 private:
  // Holds the recorded event log data. This buffer is owned by the caller.
  const raw_ref<WebrtcEventLogData> event_log_data_;
};

}  // namespace

class WebrtcTransport::PeerConnectionWrapper
    : public webrtc::PeerConnectionObserver {
 public:
  PeerConnectionWrapper(
      rtc::Thread* worker_thread,
      std::unique_ptr<webrtc::VideoEncoderFactory> encoder_factory,
      std::unique_ptr<cricket::PortAllocator> port_allocator,
      base::WeakPtr<WebrtcTransport> transport)
      : transport_(transport) {
    audio_module_ = new rtc::RefCountedObject<WebrtcAudioModule>();

    webrtc::PeerConnectionFactoryDependencies pcf_deps;
    pcf_deps.network_thread = worker_thread;
    pcf_deps.worker_thread = worker_thread;
    pcf_deps.signaling_thread = rtc::Thread::Current();
    pcf_deps.task_queue_factory = CreateWebRtcTaskQueueFactory();
    pcf_deps.call_factory = webrtc::CreateCallFactory();
    pcf_deps.event_log_factory = std::make_unique<webrtc::RtcEventLogFactory>(
        pcf_deps.task_queue_factory.get());
    cricket::MediaEngineDependencies media_deps;
    media_deps.task_queue_factory = pcf_deps.task_queue_factory.get();
    media_deps.adm = audio_module_;
    media_deps.audio_encoder_factory =
        webrtc::CreateAudioEncoderFactory<webrtc::AudioEncoderOpus>();
    media_deps.audio_decoder_factory =
        webrtc::CreateAudioDecoderFactory<webrtc::AudioDecoderOpus>();
    media_deps.video_encoder_factory = std::move(encoder_factory);
    media_deps.video_decoder_factory =
        webrtc::CreateBuiltinVideoDecoderFactory();
    media_deps.audio_processing = webrtc::AudioProcessingBuilder().Create();
    pcf_deps.media_engine = cricket::CreateMediaEngine(std::move(media_deps));
    peer_connection_factory_ =
        webrtc::CreateModularPeerConnectionFactory(std::move(pcf_deps));

    webrtc::PeerConnectionInterface::RTCConfiguration rtc_config;

    // Set bundle_policy and rtcp_mux_policy to ensure that all channels are
    // multiplexed over a single channel.
    rtc_config.bundle_policy =
        webrtc::PeerConnectionInterface::kBundlePolicyMaxBundle;
    rtc_config.rtcp_mux_policy =
        webrtc::PeerConnectionInterface::kRtcpMuxPolicyRequire;

    rtc_config.media_config.video.periodic_alr_bandwidth_probing = true;

    rtc_config.sdp_semantics = webrtc::SdpSemantics::kUnifiedPlan;

    webrtc::PeerConnectionDependencies dependencies(this);
    dependencies.allocator = std::move(port_allocator);
    auto result = peer_connection_factory_->CreatePeerConnectionOrError(
        rtc_config, std::move(dependencies));
    if (!result.ok()) {
      LOG(ERROR) << "CreatePeerConnection() failed: "
                 << result.error().message();
      return;
    }
    peer_connection_ = result.MoveValue();
  }

  PeerConnectionWrapper(const PeerConnectionWrapper&) = delete;
  PeerConnectionWrapper& operator=(const PeerConnectionWrapper&) = delete;

  ~PeerConnectionWrapper() override {
    {
      // |peer_connection_| creates threads internally, which are joined when
      // the connection is closed. See crbug.com/660081.
      ScopedAllowThreadJoinForWebRtcTransport allow_thread_join;
      peer_connection_->Close();
      peer_connection_ = nullptr;
      peer_connection_factory_ = nullptr;
    }

    audio_module_ = nullptr;
  }

  WebrtcAudioModule* audio_module() { return audio_module_.get(); }

  webrtc::PeerConnectionInterface* peer_connection() {
    return peer_connection_.get();
  }

  webrtc::PeerConnectionFactoryInterface* peer_connection_factory() {
    return peer_connection_factory_.get();
  }

  // webrtc::PeerConnectionObserver interface.
  void OnSignalingChange(
      webrtc::PeerConnectionInterface::SignalingState new_state) override {
    if (transport_) {
      transport_->OnSignalingChange(new_state);
    }
  }
  void OnAddStream(
      rtc::scoped_refptr<webrtc::MediaStreamInterface> stream) override {
    if (transport_) {
      transport_->OnAddStream(stream);
    }
  }
  void OnRemoveStream(
      rtc::scoped_refptr<webrtc::MediaStreamInterface> stream) override {
    if (transport_) {
      transport_->OnRemoveStream(stream);
    }
  }
  void OnDataChannel(
      rtc::scoped_refptr<webrtc::DataChannelInterface> data_channel) override {
    if (transport_) {
      transport_->OnDataChannel(data_channel);
    }
  }
  void OnRenegotiationNeeded() override {
    if (transport_) {
      transport_->OnRenegotiationNeeded();
    }
  }
  void OnIceConnectionChange(
      webrtc::PeerConnectionInterface::IceConnectionState new_state) override {
    if (transport_) {
      transport_->OnIceConnectionChange(new_state);
    }
  }
  void OnIceGatheringChange(
      webrtc::PeerConnectionInterface::IceGatheringState new_state) override {
    if (transport_) {
      transport_->OnIceGatheringChange(new_state);
    }
  }
  void OnIceCandidate(const webrtc::IceCandidateInterface* candidate) override {
    if (transport_) {
      transport_->OnIceCandidate(candidate);
    }
  }
  void OnIceSelectedCandidatePairChanged(
      const cricket::CandidatePairChangeEvent& event) override {
    if (transport_) {
      transport_->OnIceSelectedCandidatePairChanged(event);
    }
  }

 private:
  rtc::scoped_refptr<WebrtcAudioModule> audio_module_;
  rtc::scoped_refptr<webrtc::PeerConnectionFactoryInterface>
      peer_connection_factory_;
  rtc::scoped_refptr<webrtc::PeerConnectionInterface> peer_connection_;

  base::WeakPtr<WebrtcTransport> transport_;
};

WebrtcTransport::WebrtcTransport(
    rtc::Thread* worker_thread,
    scoped_refptr<TransportContext> transport_context,
    std::unique_ptr<webrtc::VideoEncoderFactory> video_encoder_factory,
    EventHandler* event_handler)
    : transport_context_(transport_context),
      event_handler_(event_handler),
      handshake_hmac_(crypto::HMAC::SHA256) {
  std::unique_ptr<cricket::PortAllocator> port_allocator =
      transport_context_->port_allocator_factory()->CreatePortAllocator(
          transport_context_, weak_factory_.GetWeakPtr());

  peer_connection_wrapper_ = std::make_unique<PeerConnectionWrapper>(
      worker_thread, std::move(video_encoder_factory),
      std::move(port_allocator), weak_factory_.GetWeakPtr());

  StartRtcEventLogging();
}

WebrtcTransport::~WebrtcTransport() {
  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
  Close(OK);
}

webrtc::PeerConnectionInterface* WebrtcTransport::peer_connection() {
  return peer_connection_wrapper_ ? peer_connection_wrapper_->peer_connection()
                                  : nullptr;
}

webrtc::PeerConnectionFactoryInterface*
WebrtcTransport::peer_connection_factory() {
  return peer_connection_wrapper_
             ? peer_connection_wrapper_->peer_connection_factory()
             : nullptr;
}

WebrtcAudioModule* WebrtcTransport::audio_module() {
  return peer_connection_wrapper_ ? peer_connection_wrapper_->audio_module()
                                  : nullptr;
}

std::unique_ptr<MessagePipe> WebrtcTransport::CreateOutgoingChannel(
    const std::string& name) {
  webrtc::DataChannelInit config;
  config.reliable = true;
  auto result = peer_connection()->CreateDataChannelOrError(name, &config);
  if (!result.ok()) {
    LOG(ERROR) << "CreateDataChannelOrError() failed: "
               << result.error().message();
    return nullptr;
  }
  auto data_channel = result.MoveValue();
  if (name == kControlChannelName) {
    DCHECK(!control_data_channel_);
    control_data_channel_ = data_channel;
  } else if (name == kEventChannelName) {
    DCHECK(!event_data_channel_);
    event_data_channel_ = data_channel;
  }
  return std::make_unique<WebrtcDataStreamAdapter>(data_channel);
}

void WebrtcTransport::Start(
    Authenticator* authenticator,
    SendTransportInfoCallback send_transport_info_callback) {
  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
  DCHECK(send_transport_info_callback_.is_null());

  webrtc::ThreadWrapper::EnsureForCurrentMessageLoop();

  // TODO(sergeyu): Investigate if it's possible to avoid Send().
  webrtc::ThreadWrapper::current()->set_send_allowed(true);

  send_transport_info_callback_ = std::move(send_transport_info_callback);

  if (!handshake_hmac_.Init(authenticator->GetAuthKey())) {
    LOG(FATAL) << "HMAC::Init() failed.";
  }

  event_handler_->OnWebrtcTransportConnecting();

  if (transport_context_->role() == TransportRole::SERVER) {
    RequestNegotiation();
  }
}

bool WebrtcTransport::ProcessTransportInfo(XmlElement* transport_info) {
  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);

  if (transport_info->Name() != QName(kTransportNamespace, "transport")) {
    return false;
  }

  if (!peer_connection()) {
    return false;
  }

  XmlElement* session_description = transport_info->FirstNamed(
      QName(kTransportNamespace, "session-description"));
  if (session_description) {
    webrtc::PeerConnectionInterface::SignalingState expected_state =
        transport_context_->role() == TransportRole::CLIENT
            ? webrtc::PeerConnectionInterface::kStable
            : webrtc::PeerConnectionInterface::kHaveLocalOffer;
    if (peer_connection()->signaling_state() != expected_state) {
      LOG(ERROR) << "Received unexpected WebRTC session_description.";
      return false;
    }

    std::string type = session_description->Attr(QName(std::string(), "type"));
    std::string raw_sdp = session_description->BodyText();
    if (!IsValidSessionDescriptionType(type) || raw_sdp.empty()) {
      LOG(ERROR) << "Incorrect session description format.";
      return false;
    }

    SdpMessage sdp_message(raw_sdp);

    std::string signature_base64 =
        session_description->Attr(QName(std::string(), "signature"));
    std::string signature;
    if (!base::Base64Decode(signature_base64, &signature) ||
        !handshake_hmac_.Verify(
            type + " " + sdp_message.NormalizedForSignature(), signature)) {
      LOG(WARNING) << "Received session-description with invalid signature.";
      bool ignore_error = false;
#if !defined(NDEBUG)
      ignore_error = base::CommandLine::ForCurrentProcess()->HasSwitch(
          kDisableAuthenticationSwitchName);
#endif
      if (!ignore_error) {
        Close(AUTHENTICATION_FAILED);
        return true;
      }
    }

    UpdateCodecParameters(&sdp_message, /*incoming=*/true);

    webrtc::SdpParseError error;
    std::unique_ptr<webrtc::SessionDescriptionInterface>
        webrtc_session_description(webrtc::CreateSessionDescription(
            type, sdp_message.ToString(), &error));
    if (!webrtc_session_description) {
      LOG(ERROR) << "Failed to parse the session description: "
                 << error.description << " line: " << error.line;
      return false;
    }

    {
      ScopedAllowThreadJoinForWebRtcTransport allow_wait;
      peer_connection()->SetRemoteDescription(
          SetSessionDescriptionObserver::Create(base::BindOnce(
              &WebrtcTransport::OnRemoteDescriptionSet,
              weak_factory_.GetWeakPtr(),
              type == webrtc::SessionDescriptionInterface::kOffer)),
          webrtc_session_description.release());
    }

    // SetRemoteDescription() might overwrite any bitrate caps previously set,
    // so (re)apply them here. This might happen if ICE state were already
    // connected and OnIceSelectedCandidatePairChanged() had already set the
    // caps.
    UpdateBitrates();
  }

  XmlElement* candidate_element;
  QName candidate_qname(kTransportNamespace, "candidate");
  for (candidate_element = transport_info->FirstNamed(candidate_qname);
       candidate_element;
       candidate_element = candidate_element->NextNamed(candidate_qname)) {
    std::string candidate_str = candidate_element->BodyText();
    std::string sdp_mid =
        candidate_element->Attr(QName(std::string(), "sdpMid"));
    std::string sdp_mlineindex_str =
        candidate_element->Attr(QName(std::string(), "sdpMLineIndex"));
    int sdp_mlineindex;
    if (candidate_str.empty() || sdp_mid.empty() ||
        !base::StringToInt(sdp_mlineindex_str, &sdp_mlineindex)) {
      LOG(ERROR) << "Failed to parse incoming candidates.";
      return false;
    }

    webrtc::SdpParseError error;
    std::unique_ptr<webrtc::IceCandidateInterface> candidate(
        webrtc::CreateIceCandidate(sdp_mid, sdp_mlineindex, candidate_str,
                                   &error));
    if (!candidate) {
      LOG(ERROR) << "Failed to parse incoming candidate: " << error.description
                 << " line: " << error.line;
      return false;
    }

    if (peer_connection()->signaling_state() ==
        webrtc::PeerConnectionInterface::kStable) {
      if (!peer_connection()->AddIceCandidate(candidate.get())) {
        LOG(ERROR) << "Failed to add incoming ICE candidate.";
        return false;
      }
    } else {
      pending_incoming_candidates_.push_back(std::move(candidate));
    }
  }

  return true;
}

const SessionOptions& WebrtcTransport::session_options() const {
  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
  return session_options_;
}

void WebrtcTransport::SetPreferredBitrates(
    absl::optional<int> min_bitrate_bps,
    absl::optional<int> max_bitrate_bps) {
  preferred_min_bitrate_bps_ = min_bitrate_bps;
  preferred_max_bitrate_bps_ = max_bitrate_bps;
  if (connected_) {
    UpdateBitrates();
  }
}

void WebrtcTransport::RequestIceRestart() {
  if (transport_context_->role() != TransportRole::SERVER) {
    NOTIMPLEMENTED()
        << "ICE restart only implemented for TransportRole::SERVER";
    return;
  }

  if (!connected_) {
    LOG(WARNING) << "Not connected, ignoring ICE restart request.";
    return;
  }

  VLOG(0) << "Restarting ICE due to client request.";
  connected_ = false;
  want_ice_restart_ = true;
  RequestNegotiation();
}

void WebrtcTransport::RequestSdpRestart() {
  if (transport_context_->role() != TransportRole::SERVER) {
    NOTIMPLEMENTED()
        << "SDP restart only implemented for TransportRole::SERVER";
    return;
  }

  if (!connected_) {
    LOG(WARNING) << "Not connected, ignoring SDP restart request.";
    return;
  }

  VLOG(0) << "Restarting SDP due to client request.";
  RequestNegotiation();
}

// static
void WebrtcTransport::SetDataChannelPollingIntervalForTests(
    base::TimeDelta new_polling_interval) {
  data_channel_state_polling_interval = new_polling_interval;
}

// static
void WebrtcTransport::ClosePeerConnection(
    rtc::scoped_refptr<webrtc::DataChannelInterface> control_data_channel,
    rtc::scoped_refptr<webrtc::DataChannelInterface> event_data_channel,
    std::unique_ptr<PeerConnectionWrapper> peer_connection_wrapper,
    base::Time start_time = base::Time::Now()) {
  DCHECK(peer_connection_wrapper);

  if (!control_data_channel || !event_data_channel) {
    LOG(WARNING) << "One or more data channels were not initialized, "
                 << "destroying PeerConnection.";
    base::SingleThreadTaskRunner::GetCurrentDefault()->DeleteSoon(
        FROM_HERE, peer_connection_wrapper.release());
    return;
  }

  if ((base::Time::Now() - start_time) > kWaitForDataChannelsClosedTimeout) {
    LOG(ERROR) << "Timed out waiting for data channels to close, "
               << "destroying PeerConnection.";
    base::SingleThreadTaskRunner::GetCurrentDefault()->DeleteSoon(
        FROM_HERE, peer_connection_wrapper.release());
    return;
  }

  // The data channels should have started the closing process before this
  // function was called.
  DCHECK(control_data_channel->state() == DataChannelState::kClosed ||
         control_data_channel->state() == DataChannelState::kClosing);
  DCHECK(event_data_channel->state() == DataChannelState::kClosed ||
         event_data_channel->state() == DataChannelState::kClosing);

  if (event_data_channel->state() == DataChannelState::kClosed &&
      control_data_channel->state() == DataChannelState::kClosed) {
    VLOG(0) << "Data channels closed, destroying PeerConnection.";
    base::SingleThreadTaskRunner::GetCurrentDefault()->DeleteSoon(
        FROM_HERE, peer_connection_wrapper.release());
    return;
  }

  base::SingleThreadTaskRunner::GetCurrentDefault()->PostDelayedTask(
      FROM_HERE,
      base::BindOnce(&ClosePeerConnection, std::move(control_data_channel),
                     std::move(event_data_channel),
                     std::move(peer_connection_wrapper), start_time),
      data_channel_state_polling_interval);
}

void WebrtcTransport::Close(ErrorCode error) {
  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
  if (!peer_connection_wrapper_) {
    return;
  }

  weak_factory_.InvalidateWeakPtrs();

  // Stop recording into the buffer, otherwise WebRTC might try to record
  // events into the buffer while closing the connection, after |this| has been
  // destroyed.
  StopRtcEventLogging();
  ClosePeerConnection(std::move(control_data_channel_),
                      std::move(event_data_channel_),
                      std::move(peer_connection_wrapper_));

  if (error != OK) {
    event_handler_->OnWebrtcTransportError(error);
  }
}

void WebrtcTransport::ApplySessionOptions(const SessionOptions& options) {
  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
  session_options_ = options;
  absl::optional<std::string> video_codec = options.Get("Video-Codec");
  if (video_codec) {
    preferred_video_codec_ = *video_codec;
  }
}

void WebrtcTransport::OnAudioTransceiverCreated(
    rtc::scoped_refptr<webrtc::RtpTransceiverInterface> transceiver) {}

void WebrtcTransport::OnVideoTransceiverCreated(
    rtc::scoped_refptr<webrtc::RtpTransceiverInterface> transceiver) {
  // Sender is always present, regardless of the direction of media
  // (see rtp_transceiver_interface.h).
  auto sender = transceiver->sender();
  auto [min_bitrate_bps, max_bitrate_bps] = BitratesForConnection();
  SetSenderBitrates(sender, min_bitrate_bps, max_bitrate_bps);
  SetDefaultSenderParameters(sender);
}

void WebrtcTransport::OnLocalSessionDescriptionCreated(
    std::unique_ptr<webrtc::SessionDescriptionInterface> description,
    const std::string& error) {
  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);

  if (!peer_connection()) {
    return;
  }

  if (!description) {
    LOG(ERROR) << "PeerConnection offer creation failed: " << error;
    Close(CHANNEL_CONNECTION_ERROR);
    return;
  }

  std::string description_sdp;
  if (!description->ToString(&description_sdp)) {
    LOG(ERROR) << "Failed to serialize description.";
    Close(CHANNEL_CONNECTION_ERROR);
    return;
  }

  SdpMessage sdp_message(description_sdp);
  UpdateCodecParameters(&sdp_message, /*incoming=*/false);
  if (preferred_video_codec_.empty()) {
    sdp_message.PreferVideoCodec("VP8");
  } else {
    sdp_message.PreferVideoCodec(preferred_video_codec_);
  }
  description_sdp = sdp_message.ToString();
  webrtc::SdpParseError parse_error;
  description.reset(webrtc::CreateSessionDescription(
      description->type(), description_sdp, &parse_error));
  if (!description) {
    LOG(ERROR) << "Failed to parse the session description: "
               << parse_error.description << " line: " << parse_error.line;
    Close(CHANNEL_CONNECTION_ERROR);
    return;
  }

  // Format and send the session description to the peer.
  std::unique_ptr<XmlElement> transport_info(
      new XmlElement(QName(kTransportNamespace, "transport"), true));
  XmlElement* offer_tag =
      new XmlElement(QName(kTransportNamespace, "session-description"));
  transport_info->AddElement(offer_tag);
  offer_tag->SetAttr(QName(std::string(), "type"), description->type());
  offer_tag->SetBodyText(description_sdp);

  std::string digest;
  digest.resize(handshake_hmac_.DigestLength());
  CHECK(handshake_hmac_.Sign(
      description->type() + " " + sdp_message.NormalizedForSignature(),
      reinterpret_cast<uint8_t*>(&(digest[0])), digest.size()));
  std::string digest_base64;
  base::Base64Encode(digest, &digest_base64);
  offer_tag->SetAttr(QName(std::string(), "signature"), digest_base64);

  send_transport_info_callback_.Run(std::move(transport_info));

  peer_connection()->SetLocalDescription(
      SetSessionDescriptionObserver::Create(base::BindOnce(
          &WebrtcTransport::OnLocalDescriptionSet, weak_factory_.GetWeakPtr())),
      description.release());
}

void WebrtcTransport::OnLocalDescriptionSet(bool success,
                                            const std::string& error) {
  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);

  if (!peer_connection()) {
    return;
  }

  if (!success) {
    LOG(ERROR) << "Failed to set local description: " << error;
    Close(CHANNEL_CONNECTION_ERROR);
    return;
  }

  AddPendingCandidatesIfPossible();

  // The sender "encodings" parameters are initialized after the local
  // description is set. At this point, it is possible to set parameters such as
  // maximum framerate.
  auto senders = peer_connection()->GetSenders();
  for (const auto& sender : senders) {
    SetDefaultSenderParameters(sender);
  }
}

void WebrtcTransport::OnRemoteDescriptionSet(bool send_answer,
                                             bool success,
                                             const std::string& error) {
  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);

  if (!peer_connection()) {
    return;
  }

  if (!success) {
    LOG(ERROR) << "Failed to set remote description: " << error;
    Close(CHANNEL_CONNECTION_ERROR);
    return;
  }

  // Create and send answer on the server.
  if (send_answer) {
    const webrtc::PeerConnectionInterface::RTCOfferAnswerOptions options;
    peer_connection()->CreateAnswer(
        CreateSessionDescriptionObserver::Create(
            base::BindOnce(&WebrtcTransport::OnLocalSessionDescriptionCreated,
                           weak_factory_.GetWeakPtr())),
        options);
  }

  AddPendingCandidatesIfPossible();
}

void WebrtcTransport::OnSignalingChange(
    webrtc::PeerConnectionInterface::SignalingState new_state) {
  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
}

void WebrtcTransport::OnAddStream(
    rtc::scoped_refptr<webrtc::MediaStreamInterface> stream) {
  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
  event_handler_->OnWebrtcTransportMediaStreamAdded(stream);
}

void WebrtcTransport::OnRemoveStream(
    rtc::scoped_refptr<webrtc::MediaStreamInterface> stream) {
  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
  event_handler_->OnWebrtcTransportMediaStreamRemoved(stream);
}

void WebrtcTransport::OnDataChannel(
    rtc::scoped_refptr<webrtc::DataChannelInterface> data_channel) {
  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
  std::string data_channel_name = data_channel->label();
  if (data_channel_name == kControlChannelName) {
    DCHECK(!control_data_channel_);
    control_data_channel_ = data_channel;
  } else if (data_channel_name == kEventChannelName) {
    DCHECK(!event_data_channel_);
    event_data_channel_ = data_channel;
  }
  event_handler_->OnWebrtcTransportIncomingDataChannel(
      data_channel_name,
      std::make_unique<WebrtcDataStreamAdapter>(data_channel));
}

void WebrtcTransport::OnRenegotiationNeeded() {
  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);

  if (transport_context_->role() == TransportRole::SERVER) {
    RequestNegotiation();
  } else {
    // TODO(sergeyu): Is it necessary to support renegotiation initiated by the
    // client?
    NOTIMPLEMENTED();
  }
}

void WebrtcTransport::OnIceConnectionChange(
    webrtc::PeerConnectionInterface::IceConnectionState new_state) {
  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);

  if (!connected_ &&
      new_state == webrtc::PeerConnectionInterface::kIceConnectionConnected) {
    connected_ = true;
    connection_relayed_.reset();
    event_handler_->OnWebrtcTransportConnected();
  } else if (connected_ &&
             new_state ==
                 webrtc::PeerConnectionInterface::kIceConnectionDisconnected &&
             transport_context_->role() == TransportRole::SERVER) {
    connected_ = false;
    want_ice_restart_ = true;
    RequestNegotiation();
  }
}

void WebrtcTransport::OnIceGatheringChange(
    webrtc::PeerConnectionInterface::IceGatheringState new_state) {
  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
}

void WebrtcTransport::OnIceCandidate(
    const webrtc::IceCandidateInterface* candidate) {
  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);

  std::unique_ptr<XmlElement> candidate_element(
      new XmlElement(QName(kTransportNamespace, "candidate")));
  std::string candidate_str;
  if (!candidate->ToString(&candidate_str)) {
    LOG(ERROR) << "Failed to serialize local candidate.";
    return;
  }
  candidate_element->SetBodyText(candidate_str);
  candidate_element->SetAttr(QName(std::string(), "sdpMid"),
                             candidate->sdp_mid());
  candidate_element->SetAttr(
      QName(std::string(), "sdpMLineIndex"),
      base::NumberToString(candidate->sdp_mline_index()));

  EnsurePendingTransportInfoMessage();
  pending_transport_info_message_->AddElement(candidate_element.release());
}

void WebrtcTransport::OnIceSelectedCandidatePairChanged(
    const cricket::CandidatePairChangeEvent& event) {
  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);

  std::string transport_protocol =
      GetTransportProtocol(event.selected_candidate_pair);
  if (transport_protocol != transport_protocol_) {
    transport_protocol_ = transport_protocol;
    event_handler_->OnWebrtcTransportProtocolChanged();
  }

  // Unknown -> direct/relayed is treated as a
  // change, so the correct initial bitrate caps are set.
  absl::optional<bool> connection_relayed =
      IsConnectionRelayed(event.selected_candidate_pair);
  if (connection_relayed != connection_relayed_) {
    connection_relayed_ = connection_relayed;
    if (connection_relayed_.has_value()) {
      VLOG(0) << "Relay connection: "
              << (connection_relayed_.value() ? "true" : "false");
    } else {
      LOG(ERROR) << "Connection type unknown, treating as direct.";
    }

    // The max-bitrate needs to be applied even for direct (non-TURN)
    // connections. Otherwise the video-sender b/w estimate is capped to a low
    // default value (~600kbps).
    // Set the global bitrate caps in addition to the VideoSender bitrates. The
    // global caps affect the probing configuration used by b/w estimator.
    UpdateBitrates();
  }

  const cricket::Candidate& local_candidate =
      event.selected_candidate_pair.local_candidate();
  const cricket::Candidate& remote_candidate =
      event.selected_candidate_pair.remote_candidate();

  TransportRoute route;
  static_assert(TransportRoute::DIRECT < TransportRoute::STUN &&
                    TransportRoute::STUN < TransportRoute::RELAY,
                "Route type enum values are ordered by 'indirectness'");
  route.type =
      std::max(CandidateTypeToTransportRouteType(local_candidate.type()),
               CandidateTypeToTransportRouteType(remote_candidate.type()));

  VLOG(0) << "Selected candidate-pair changed, reason = " << event.reason;
  VLOG(0) << "  Local IP = " << local_candidate.address().ToString()
          << ", type = " << local_candidate.type()
          << ", protocol = " << local_candidate.protocol();
  VLOG(0) << "  Remote IP = " << remote_candidate.address().ToString()
          << ", type = " << remote_candidate.type()
          << ", protocol = " << remote_candidate.protocol();

  // Try to convert local and peer addresses. These may sometimes be invalid,
  // for example, a "relay" or "prflx" candidate from a relay connection
  // might have the IP address stripped away by WebRTC - see
  // http://crbug.com/1128667.
  if (!webrtc::SocketAddressToIPEndPoint(remote_candidate.address(),
                                         &route.remote_address)) {
    VLOG(0) << "Peer IP address is invalid.";
  }
  if (!webrtc::SocketAddressToIPEndPoint(local_candidate.address(),
                                         &route.local_address)) {
    VLOG(0) << "Local IP address is invalid.";
  }

  VLOG(0) << "Sending route-changed notification.";
  event_handler_->OnWebrtcTransportRouteChanged(route);
}

std::tuple<int, int> WebrtcTransport::BitratesForConnection() {
  int max_bitrate_bps = kMaxBitrateBps;
  if (connection_relayed_.value_or(false)) {
    int turn_max_rate_kbps = transport_context_->GetTurnMaxRateKbps();
    if (turn_max_rate_kbps <= 0) {
      VLOG(0) << "No TURN bitrate cap set.";
    } else {
      // Apply the TURN bitrate cap to prevent large amounts of packet loss.
      // The Google TURN/relay server limits the connection speed by dropping
      // packets, which may interact badly with WebRTC's bandwidth-estimation.
      VLOG(0) << "Capping bitrate to " << turn_max_rate_kbps << "kbps.";
      max_bitrate_bps = turn_max_rate_kbps * 1000;
    }
  }

  if (preferred_max_bitrate_bps_.has_value()) {
    if (*preferred_max_bitrate_bps_ >= 0 &&
        *preferred_max_bitrate_bps_ <= max_bitrate_bps) {
      VLOG(0) << "Client sets max bitrate to " << *preferred_max_bitrate_bps_
              << " bps.";
      max_bitrate_bps = *preferred_max_bitrate_bps_;
    } else {
      LOG(WARNING) << "Max bitrate setting  " << *preferred_max_bitrate_bps_
                   << " bps ignored since it's not in the range of "
                   << "[0, " << max_bitrate_bps << "].";
    }
  }

  int min_bitrate_bps = 0;
  if (preferred_min_bitrate_bps_.has_value()) {
    if (preferred_min_bitrate_bps_ >= 0 &&
        preferred_min_bitrate_bps_ <= max_bitrate_bps) {
      VLOG(0) << "Client sets min bitrate to " << *preferred_min_bitrate_bps_
              << " bps.";
      min_bitrate_bps = *preferred_min_bitrate_bps_;
    } else {
      LOG(WARNING) << "Min bitrate setting  " << *preferred_min_bitrate_bps_
                   << " bps ignored since it's not in the range of "
                   << "[0, " << max_bitrate_bps << "].";
    }
  }
  return {min_bitrate_bps, max_bitrate_bps};
}

void WebrtcTransport::UpdateBitrates() {
  auto [min_bitrate_bps, max_bitrate_bps] = BitratesForConnection();
  SetPeerConnectionBitrates(min_bitrate_bps, max_bitrate_bps);
  auto senders = peer_connection()->GetSenders();
  for (auto& sender : senders) {
    if (sender->media_type() == cricket::MEDIA_TYPE_VIDEO) {
      SetSenderBitrates(sender, min_bitrate_bps, max_bitrate_bps);
    }
  }
}

void WebrtcTransport::SetPeerConnectionBitrates(int min_bitrate_bps,
                                                int max_bitrate_bps) {
  DCHECK_LE(min_bitrate_bps, max_bitrate_bps);
  webrtc::BitrateSettings bitrate;
  if (min_bitrate_bps > 0) {
    bitrate.min_bitrate_bps = min_bitrate_bps;
  } else {
    bitrate.min_bitrate_bps.reset();
  }
  bitrate.max_bitrate_bps = max_bitrate_bps;
  peer_connection()->SetBitrate(bitrate);
}

void WebrtcTransport::SetSenderBitrates(
    rtc::scoped_refptr<webrtc::RtpSenderInterface> sender,
    int min_bitrate_bps,
    int max_bitrate_bps) {
  DCHECK_LE(min_bitrate_bps, max_bitrate_bps);
  webrtc::RtpParameters parameters = sender->GetParameters();
  if (parameters.encodings.empty()) {
    LOG(ERROR) << "No encodings found for sender " << sender->id();
    return;
  }

  if (parameters.encodings.size() != 1) {
    LOG(ERROR) << "Unexpected number of encodings ("
               << parameters.encodings.size() << ") for sender "
               << sender->id();
  }

  if (min_bitrate_bps > 0) {
    parameters.encodings[0].min_bitrate_bps = min_bitrate_bps;
  } else {
    parameters.encodings[0].min_bitrate_bps.reset();
  }
  parameters.encodings[0].max_bitrate_bps = max_bitrate_bps;

  SetSenderParameters(*sender, parameters);
}

void WebrtcTransport::RequestNegotiation() {
  DCHECK(transport_context_->role() == TransportRole::SERVER);

  if (!negotiation_pending_) {
    negotiation_pending_ = true;
    base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
        FROM_HERE, base::BindOnce(&WebrtcTransport::SendOffer,
                                  weak_factory_.GetWeakPtr()));
  }
}

void WebrtcTransport::SendOffer() {
  DCHECK(transport_context_->role() == TransportRole::SERVER);

  DCHECK(negotiation_pending_);
  negotiation_pending_ = false;

  webrtc::PeerConnectionInterface::RTCOfferAnswerOptions options;
  options.offer_to_receive_video = false;
  options.offer_to_receive_audio = false;
  options.ice_restart = want_ice_restart_;
  peer_connection()->CreateOffer(
      CreateSessionDescriptionObserver::Create(
          base::BindOnce(&WebrtcTransport::OnLocalSessionDescriptionCreated,
                         weak_factory_.GetWeakPtr())),
      options);
}

void WebrtcTransport::EnsurePendingTransportInfoMessage() {
  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);

  // |transport_info_timer_| must be running iff
  // |pending_transport_info_message_| exists.
  DCHECK_EQ(pending_transport_info_message_ != nullptr,
            transport_info_timer_.IsRunning());

  if (!pending_transport_info_message_) {
    pending_transport_info_message_ = std::make_unique<XmlElement>(
        QName(kTransportNamespace, "transport"), true);

    // Delay sending the new candidates in case we get more candidates
    // that we can send in one message.
    transport_info_timer_.Start(FROM_HERE,
                                base::Milliseconds(kTransportInfoSendDelayMs),
                                this, &WebrtcTransport::SendTransportInfo);
  }
}

void WebrtcTransport::SendTransportInfo() {
  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
  DCHECK(pending_transport_info_message_);

  send_transport_info_callback_.Run(std::move(pending_transport_info_message_));
}

void WebrtcTransport::AddPendingCandidatesIfPossible() {
  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);

  if (peer_connection()->signaling_state() ==
      webrtc::PeerConnectionInterface::kStable) {
    for (const auto& candidate : pending_incoming_candidates_) {
      if (!peer_connection()->AddIceCandidate(candidate.get())) {
        LOG(ERROR) << "Failed to add incoming candidate";
        Close(INCOMPATIBLE_PROTOCOL);
        return;
      }
    }
    pending_incoming_candidates_.clear();
  }
}

void WebrtcTransport::StartRtcEventLogging() {
  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
  if (!peer_connection()) {
    return;
  }

  // Start recording into |rtc_event_log_|. This is safe because, when |this| is
  // destroyed, it calls Close() which stops recording the RTC event log.
  rtc_event_log_.Clear();
  peer_connection()->StartRtcEventLog(
      std::make_unique<RtcEventLogOutput>(rtc_event_log_));
}

void WebrtcTransport::StopRtcEventLogging() {
  DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
  if (peer_connection()) {
    ScopedAllowThreadJoinForWebRtcTransport allow_wait;
    peer_connection()->StopRtcEventLog();
  }
}

}  // namespace remoting::protocol
